package top.im

import akka.actor._
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
import top.im.messages._
import io.undertow.websockets.core._
import top.im.messages.{Here, NewConnect, WhereIAm}


/**
  * Created by jf on 16/1/23.
  */

class PushServer(wxPath: String) extends Actor with ActorLogging {

  val cluster = Cluster(context.system)

  val nodesHash = new ConsistencyHash[Address]()

  override def preStart() = cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])

  override def postStop() = cluster.unsubscribe(self)

  override def receive: Receive = {
    case x: WhereIAm =>
      println(x)
      sender() ! Here(wxPath)
    case x: NewConnect =>
      val webSocketServer = context.actorOf(Props.apply(classOf[WebSocketServer], x.channel), x.id)
      x.channel.getReceiveSetter.set(new AbstractReceiveListener() {
        override def onFullTextMessage(channel: WebSocketChannel, message: BufferedTextMessage) = {
          webSocketServer ! message
        }

        override def onCloseMessage(cm: CloseMessage, channel: WebSocketChannel): Unit = {
          super.onCloseMessage(cm, channel)
          webSocketServer ! cm
        }
      })
      x.channel.resumeReceives()

    case x: Push =>
      log.info("Push : {}", x)
      context.child(x.id).getOrElse(sender()) ! PushMessage(x.message)


    case MemberUp(member) =>
      log.info("node [{}] joined", member.address)
      nodesHash.add(member.address)
    case MemberRemoved(member, previousStatus) => nodesHash.remove(member.address)
  }

}


object PushServer {

  val extractEntityId: ShardRegion.ExtractEntityId = {
    case c: NewConnect => (c.id, c)
    case p: Push => (p.id, p)
    case w: WhereIAm => (w.id, w)
  }

  val extractShardId: ShardRegion.ExtractShardId = {
    case c: NewConnect => (math.abs(c.id.hashCode) % 100).toString
    case p: Push => (math.abs(p.id.hashCode) % 100).toString
    case w: WhereIAm => (math.abs(w.id.hashCode) % 100).toString
  }

  def apply(wxPath: String)(implicit system: ActorSystem): ActorRef = {

    val pushServer = ClusterSharding(system).start(
      typeName = "push",
      entityProps = Props.apply(classOf[PushServer], wxPath),
      settings = ClusterShardingSettings(system),
      extractEntityId = extractEntityId,
      extractShardId = extractShardId)

    pushServer
  }
}



